#include "duckdb.hpp"

#include "pgduckdb/pgduckdb_utils.hpp"
#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/pgduckdb_duckdb.hpp"
#include "pgduckdb/pgduckdb_xact.hpp"
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
#include "pgduckdb/pgduckdb_userdata_cache.hpp"
#include "pgduckdb/pg/functions.hpp"
#include "pgduckdb/utility/cpp_wrapper.hpp"

extern "C" {
#include "postgres.h"
#include "executor/spi.h"
#include "foreign/foreign.h"
#include "utils/builtins.h"
}

namespace pgduckdb {

// Find a unique name for the server, based on the given prefix.
// The name is generated by appending an incrementing number to the prefix
// until a unique name is found. For example, if the prefix is "my_server",
// the function will try "my_server_1", "my_server_2", etc. until it finds
// a name that does not exist in the database.
//
// Technically we should do this under a lock to make sure no other connection
// process is creating the same server, though this is unlikely to happen.
std::string
FindServerName(const char *server_prefix) {
	auto oid = get_foreign_server_oid(server_prefix, true);
	if (oid == InvalidOid) {
		return server_prefix;
	}

	uint32_t i = 0;
	std::ostringstream oss;
	oss << server_prefix << "_";
	const auto len = oss.str().length();
	while (true) {
		oss.seekp(len);
		oss << ++i;

		auto server_name = oss.str();
		if (get_foreign_server_oid(server_name.c_str(), true) == InvalidOid) {
			return server_name;
		}
	}
}

namespace pg {

std::string
ReadOptions(FunctionCallInfo fcinfo, int start, const std::vector<std::string> &names) {
	std::ostringstream oss;
	int opt_idx = start;
	for (const auto &name : names) {
		auto value = GetArgString(fcinfo, opt_idx++);
		if (value.empty()) {
			continue;
		}

		if (!oss.str().empty()) {
			oss << ", ";
		}

		oss << name << " " << duckdb::KeywordHelper::WriteQuoted(value);
	}
	return oss.str();
}

} // namespace pg
} // namespace pgduckdb

extern "C" {

DECLARE_PG_FUNCTION(pgduckdb_raw_query) {
	const char *query = text_to_cstring(PG_GETARG_TEXT_PP(0));
	auto result = pgduckdb::DuckDBQueryOrThrow(query);
	elog(NOTICE, "result: %s", result->ToString().c_str());
	PG_RETURN_BOOL(true);
}

DECLARE_PG_FUNCTION(pgduckdb_is_motherduck_enabled) {
	PG_RETURN_BOOL(pgduckdb::IsMotherDuckEnabled());
}

DECLARE_PG_FUNCTION(pgduckdb_enable_motherduck) {
	pgduckdb::pg::PreventInTransactionBlock("duckdb.enable_motherduck()");

	if (pgduckdb::IsMotherDuckEnabled()) {
		elog(NOTICE, "MotherDuck is already enabled");
		PG_RETURN_BOOL(false);
	}

	auto token = pgduckdb::pg::GetArgString(fcinfo, 0);
	auto default_database = pgduckdb::pg::GetArgString(fcinfo, 1);

	// If no token provided, check that token exists in the environment
	if (token == "::FROM_ENV::" && getenv("MOTHERDUCK_TOKEN") == nullptr && getenv("motherduck_token") == nullptr) {
		elog(ERROR, "No token was provided and `motherduck_token` environment variable was not set");
	}

	SPI_connect_ext(SPI_OPT_NONATOMIC);

	if (pgduckdb::GetMotherduckForeignServerOid() == InvalidOid) {
		std::string query = "CREATE SERVER motherduck TYPE 'motherduck' FOREIGN DATA WRAPPER duckdb";
		if (default_database.empty()) {
			query += ";";
		} else {
			query += " OPTIONS (default_database " + duckdb::KeywordHelper::WriteQuoted(default_database) + ");";
		}
		auto ret = SPI_exec(query.c_str(), 0);
		if (ret != SPI_OK_UTILITY) {
			elog(ERROR, "Could not create 'motherduck' SERVER: %s", SPI_result_code_string(ret));
		}
	} else if (!default_database.empty()) {
		// TODO: check if it was set to the same value and update it or only error in that case
		elog(ERROR, "Cannot provide a default_database: because the server already exists");
	}

	{
		// Create mapping for current user
		auto query = "CREATE USER MAPPING FOR CURRENT_USER SERVER motherduck OPTIONS (token " +
		             duckdb::KeywordHelper::WriteQuoted(token) + ");";
		auto ret = SPI_exec(query.c_str(), 0);
		if (ret != SPI_OK_UTILITY) {
			elog(ERROR, "Could not create USER MAPPING for current user: %s", SPI_result_code_string(ret));
		}
	}

	/*
	 * New we need to start the background worker, so that the table sync
	 * starts. We need to first commit though, otherwise if the background
	 * worker starts quick enough, it will not see that we enabled motherduck
	 * (i.e. it does not see the motherduck SERVER).
	 */
	SPI_commit();
	SPI_finish();
	pgduckdb::StartBackgroundWorkerIfNeeded();

	PG_RETURN_VOID();
}

DECLARE_PG_FUNCTION(pgduckdb_create_simple_secret) {
	auto type = pgduckdb::pg::GetArgString(fcinfo, 0);
	auto lc_type = duckdb::StringUtil::Lower(type);
	if (lc_type != "r2" && lc_type != "s3" && lc_type != "gcs") {
		elog(ERROR,
		     "Invalid type '%s': this helper only supports 's3', 'gcs' or 'r2'. Please refer to the documentation to "
		     "create advanced secrets.",
		     type.c_str());
	}

	auto key = pgduckdb::pg::GetArgString(fcinfo, 1);
	auto secret = pgduckdb::pg::GetArgString(fcinfo, 2);
	auto session_token = pgduckdb::pg::GetArgString(fcinfo, 3);
	auto secret_name = "simple_" + lc_type + "_secret";
	SPI_connect();
	auto server_name = pgduckdb::FindServerName(secret_name.c_str());
	{
		std::ostringstream create_server_query;
		create_server_query << "CREATE SERVER " << server_name << " TYPE '" << type << "' FOREIGN DATA WRAPPER duckdb";

		auto options = pgduckdb::pg::ReadOptions(
		    fcinfo, 4, {"region", "url_style", "provider", "endpoint", "scope", "validation", "use_ssl"});
		if (!options.empty()) {
			create_server_query << " OPTIONS (" << options << ")";
		}

		auto ret = SPI_exec(create_server_query.str().c_str(), 0);
		if (ret != SPI_OK_UTILITY) {
			elog(ERROR, "Could not create '%s' SERVER: %s", type.c_str(), SPI_result_code_string(ret));
		}
	}

	{
		std::ostringstream create_mapping_query;
		create_mapping_query << "CREATE USER MAPPING FOR CURRENT_USER SERVER " << server_name << " OPTIONS (KEY_ID "
		                     << duckdb::KeywordHelper::WriteQuoted(key) << ", SECRET "
		                     << duckdb::KeywordHelper::WriteQuoted(secret);

		if (!session_token.empty()) {
			create_mapping_query << ", session_token " << duckdb::KeywordHelper::WriteQuoted(session_token);
		}

		create_mapping_query << ");";
		auto ret = SPI_exec(create_mapping_query.str().c_str(), 0);
		if (ret != SPI_OK_UTILITY) {
			elog(ERROR, "Could not create '%s' USER MAPPING: %s", type.c_str(), SPI_result_code_string(ret));
		}
	}

	SPI_finish();

	auto result = cstring_to_text(server_name.c_str());
	PG_RETURN_TEXT_P(result);
}

DECLARE_PG_FUNCTION(pgduckdb_create_azure_secret) {
	// XXX: Should this install `azure` if not already installed? Or fail if not?
	auto connection_string = pgduckdb::pg::GetArgString(fcinfo, 0);
	SPI_connect();
	auto server_name = pgduckdb::FindServerName("azure_secret");
	{
		std::ostringstream create_server_query;
		create_server_query << "CREATE SERVER " << server_name << " TYPE 'azure' FOREIGN DATA WRAPPER duckdb";

		auto options = pgduckdb::pg::ReadOptions(fcinfo, 1, {"scope"});
		if (!options.empty()) {
			create_server_query << " OPTIONS (" << options << ")";
		}

		auto ret = SPI_exec(create_server_query.str().c_str(), 0);
		if (ret != SPI_OK_UTILITY) {
			elog(ERROR, "Could not create 'azure' SERVER: %s", SPI_result_code_string(ret));
		}
	}

	auto query = "CREATE USER MAPPING FOR CURRENT_USER SERVER " + server_name + " OPTIONS (connection_string " +
	             duckdb::KeywordHelper::WriteQuoted(connection_string) + ");";
	auto ret = SPI_exec(query.c_str(), 0);
	if (ret != SPI_OK_UTILITY) {
		elog(ERROR, "Could not create 'azure' USER MAPPING: %s", SPI_result_code_string(ret));
	}

	SPI_finish();
	auto result = cstring_to_text(server_name.c_str());
	PG_RETURN_TEXT_P(result);
}

/*
 * We need these dummy cache functions so that people are able to load the
 * new pg_duckdb.so file with an old SQL version (where these functions still
 * exist). People should then upgrade the SQL part of the extension using the
 * command described in the error message. Once we believe no-one is on these old
 * version anymore we can remove these dummy functions.
 */
DECLARE_PG_FUNCTION(cache) {
	elog(ERROR, "duckdb.cache is not supported anymore, please run 'ALTER EXTENSION pg_duckdb UPDATE'");
}

DECLARE_PG_FUNCTION(cache_info) {
	elog(ERROR, "duckdb.cache_info is not supported anymore, please run 'ALTER EXTENSION pg_duckdb UPDATE'");
}

DECLARE_PG_FUNCTION(cache_delete) {
	elog(ERROR, "duckdb.cache_delete is not supported anymore, please run 'ALTER EXTENSION pg_duckdb UPDATE'");
}

DECLARE_PG_FUNCTION(pgduckdb_recycle_ddb) {
	pgduckdb::RequireDuckdbExecution();
	/*
	 * We cannot safely run this in a transaction block, because a DuckDB
	 * transaction might have already started. Recycling the database will
	 * violate our assumptions about DuckDB its transaction lifecycle
	 */
	pgduckdb::pg::PreventInTransactionBlock("duckdb.recycle_ddb()");
	pgduckdb::DuckDBManager::Reset();
	PG_RETURN_BOOL(true);
}

DECLARE_PG_FUNCTION(duckdb_row_in) {
	elog(ERROR, "Creating the duckdb.row type is not supported");
}

DECLARE_PG_FUNCTION(duckdb_row_out) {
	elog(ERROR, "Converting a duckdb.row to a string is not supported");
}

DECLARE_PG_FUNCTION(duckdb_struct_in) {
	elog(ERROR, "Creating the duckdb.struct type is not supported");
}

DECLARE_PG_FUNCTION(duckdb_struct_out) {
	return textout(fcinfo);
}

DECLARE_PG_FUNCTION(duckdb_unresolved_type_in) {
	return textin(fcinfo);
}

DECLARE_PG_FUNCTION(duckdb_unresolved_type_out) {
	return textout(fcinfo);
}

DECLARE_PG_FUNCTION(duckdb_unresolved_type_operator) {
	elog(ERROR, "Unresolved duckdb types cannot be used by the Postgres executor");
}

DECLARE_PG_FUNCTION(duckdb_only_function) {
	char *function_name = DatumGetCString(DirectFunctionCall1(regprocout, fcinfo->flinfo->fn_oid));
	elog(ERROR, "Function '%s' only works with DuckDB execution", function_name);
}

DECLARE_PG_FUNCTION(duckdb_union_in) {
	elog(ERROR, "Creating the duckdb.union type is not supported");
}

DECLARE_PG_FUNCTION(duckdb_union_out) {
	return textout(fcinfo);
}

DECLARE_PG_FUNCTION(duckdb_map_in) {
	elog(ERROR, "Creating the duckdb.map type is not supported");
}

DECLARE_PG_FUNCTION(duckdb_map_out) {
	return textout(fcinfo);
}

DECLARE_PG_FUNCTION(pgduckdb_test_escape_uri) {
	auto input = pgduckdb::pg::GetArgString(fcinfo, 0);
	std::ostringstream oss;
	pgduckdb::AppendEscapedUri(oss, input.c_str());
	auto result = oss.str();
	auto text_result = cstring_to_text(result.c_str());
	PG_RETURN_TEXT_P(text_result);
}

} // extern "C"
